HDFS API 操作

Posted by Jackson on 2017-08-22

HDFS Java API操作

首先是完成FileSystem的初始化操作:

完成连接集群的配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class HDFSAPITest {

FileSystem fileSystem;

/**
* 在Before中完成FileSystem的初始化操作
*/
@Before
public void setUp() throws Exception {
URI uri = new URI("hdfs://bigdata01:9000");
Configuration configuration = new Configuration();

//configuration.set("dfs.client.use.datanode.hostname","true");
//configuration.set("dfs.replication","1");
fileSystem = FileSystem.get(uri, configuration,"hadoop");
}

/**
* 在After方法中完成FileSystem的关闭操作
*/
@After
public void tearDown()throws Exception {
if(null != fileSystem) {
fileSystem.close();
}
}
}

在HDFS上面创建文件夹

1
2
3
4
5
6
7
/*
* @description: 在HDFS上面创建文件夹
*/
@Test
public void mkdir() throws Exception {
fileSystem.mkdirs(new Path("/hdfsapi2"));
}

删除HDFS上面的文件夹

1
2
3
4
5
6
7
/*
* @description: 删除HDFS上面的文件夹
*/
@Test
public void delete() throws Exception {
fileSystem.delete(new Path("/hdfsapi2"), true);
}

从本地拷贝文件上传到HDFS

1
2
3
4
5
6
7
8
9
/*
* @description: 从本地拷贝文件上传到HDFS
*/
@Test
public void copyFromLocalFile() throws Exception {
Path src = new Path("data/wcfile.txt");
Path des = new Path("/hdfsapi");
fileSystem.copyFromLocalFile(src, des);
}

重命名HDFS上面的文件

1
2
3
4
5
6
7
8
9
/*
* @description:重命名HDFS上面的文件,相当于move的操作
*/
@Test
public void renameFile() throws IOException {
Path src = new Path("/hdfsapi/wcfile.txt");
Path des = new Path("/hdfsapi/output/wcfile.txt");
fileSystem.rename(src, des);
}

获取HDFS上面的文件信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* @description: 获取HDFS上面的文件信息,包括文件的具体信息
*/
@Test
public void list_Files() throws Exception {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hdfsapi/data/20201011"), true);
while (files.hasNext()) {
//LocatedFileStatus class defines a FileStatus that includes a file's block locations.
LocatedFileStatus fileStatus = files.next();
//Get the path of the file in hdfs
String path = fileStatus.getPath().toString();
//Get the block size of the file.
long blockSize = fileStatus.getBlockSize();
//Get the length of this file, in bytes.
long len = fileStatus.getLen();
//Get FsPermission associated with the file.default "rwxrwxrwx"
FsPermission permission = fileStatus.getPermission();
//Is this a directory? @return true if this is a directory
String isDir = fileStatus.isDirectory() ? "文件夹" : "文件";
//Get the file's block locations
BlockLocation[] locations = fileStatus.getBlockLocations();
System.out.println("path:" + path);
System.out.println("blockSize:" + blockSize);
System.out.println("len:" + len);
System.out.println("permission:" + permission);
System.out.println("isDir:" + isDir);
/**
* Represents the network location of a block, information about the hosts
* that contain block replicas, and other block metadata (E.g. the file
* offset associated with the block, length, whether it is corrupt, etc).
*/
for (BlockLocation location : locations) {
String[] hosts = location.getHosts();
for (String host : hosts) {
System.out.println("HOST: " + host + " .......................");
}
}
}
}

使用IO的方式操作HDFS

1
2
3
4
5
6
7
8
9
10
11
12
/*
* @description: 使用IO的方式从本地拷贝文件到HDFS上面
*/
@Test
public void copyFromLocalFileIO() throws Exception {
BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream("data/wcfile.txt"));
// 需要先在hdfs上面创建此路径
FSDataOutputStream out = fileSystem.create(new Path("/hdfsapi2/wcfile.txt"));
IOUtils.copyBytes(inputStream, out, 1024);
IOUtils.closeStream(inputStream);
IOUtils.closeStream(out);
}

从HDFS拷贝文件到本地

1
2
3
4
5
6
7
8
9
/*
* @description: 从HDFS 拷贝文件到本地
*/
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfsapi");
Path des = new Path("output/data");
fileSystem.copyToLocalFile(src, des);
}

需求:把每个block给单独下载下来

HDFS上面的文件为压缩文件大小为193M,这里模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 需求:把每个block给单独下载下来
* <p>
* 需要每个block单独下载
* block0: 134217728 0-134217728
* block1: 134217728 134217728-134217728
* block2: 14145902 134217728-
*/

/*
* @description: 采用IO 的方式拷贝文件到本地 下载第一个block
*/
@Test
public void copyToLocalFileWidthIO() throws Exception {
FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/spark-2.2.0-bin-hadoop2.6.tgz"));
FileOutputStream out = new FileOutputStream(new File("D:/WCTest/spark.tar.gz.part0"));
byte[] bufffer = new byte[1024];
// 传输0-128M
for (int i = 0; i < 1024 * 128; i++) {
in.read(bufffer);
out.write(bufffer);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}

/*
* @description: 采用IO 的方式拷贝文件到本地 下载第二个block
*/
@Test
public void copyToLocalFileWidthIO02() throws Exception {
FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/spark-2.2.0-bin-hadoop2.6.tgz"));
FileOutputStream out = new FileOutputStream(new File("D:/WCTest/spark.tar.gz.part1"));

// 设置指定读取的offset
in.seek(1024 * 1024 * 128);

byte[] bufffer = new byte[1024];
// 传输0-128M
for (int i = 0; i < 1024 * 50; i++) {
in.read(bufffer);
out.write(bufffer);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}

/*
* @description: 采用IO 的方式拷贝文件到本地 下载第三个block
*/
@Test
public void copyToLocalFileWidthIO03() throws Exception {
FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/spark-2.2.0-bin-hadoop2.6.tgz"));
FileOutputStream out = new FileOutputStream(new File("D:/WCTest/spark.tar.gz.part2"));

// 设置指定读取的offset 256M
in.seek(1024 * 1024 * 178);

IOUtils.copyBytes(in, out, 1024);
IOUtils.closeStream(in);
IOUtils.closeStream(out);

/**
* 完成之后在windows上面的cmd使用type命令进行拼接
* type spark.tar.gz.part1 >> spark.tar.gz.part0
* type spark.tar.gz.part2 >> spark.tar.gz.part0
*/
}